package com.lb.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * 功能描述: kafka消息监听类<br>
 * @Author: Edward
 * @Date: 2021/5/27 15:59
 */
@Component
public class KafkaConsumer {
    @Resource
    ConsumerFactory consumerFactory;

    /**
     * 消费监听
     * @param record
     */
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费："+record.topic()+"-"+record.partition()+"-"+record.value());
    }


    /**
     * 功能描述: 指定topic、partition、offset消费
     * 同时监听topic1和topic2，监听topic1的0号分区、topic2的 "0号和1号" 分区，指向1号分区的offset初始值为8 <br>
     * @param record
     * @return: 
     * @Author: Edward
     * @Date: 2021/5/27 16:44
     */
    @KafkaListener(id = "consumer1",groupId = "felix-group", topicPartitions = {
            @TopicPartition(topic = "test", partitions = { "0" }),
            @TopicPartition(topic = "myTopic", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
    })
    public void onMessage2(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费："+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    /**
     * 功能描述: 批量消费<br>
     * @param records
     * @return:
     * @Author: Edward
     * @Date: 2021/5/27 16:48
     */
    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "myTopic")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费一次，records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println(record.value());
        }
    }

    // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
    @KafkaListener(topics = {"myTopic"},errorHandler = "consumerAwareErrorHandler")
    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
        throw new Exception("简单消费-模拟异常");
    }

    // 批量消费也一样，异常处理器的message.getPayload()也可以拿到各条消息的信息
    @KafkaListener(topics = "myTopic",errorHandler="consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消费一次...");
        throw new Exception("批量消费-模拟异常");
    }


    // 消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            /*if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }*/
            if (!consumerRecord.value().toString().contains("yellow")) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }

    // 消息过滤监听
    @KafkaListener(topics = {"test"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }


    /**
     * 功能描述: 消息转发<br>
     * @param record
     * @return: {@link String}
     * @Author: Edward
     * @Date: 2021/5/27 17:34
     */
    @KafkaListener(topics = {"test"})
    @SendTo("myTopic")
    public String onMessage7(ConsumerRecord<?, ?> record) {
        return record.value()+"-forward message";
    }

}